• <sub id="h4knl"><ol id="h4knl"></ol></sub>
    <sup id="h4knl"></sup>
      <sub id="h4knl"></sub>

      <sub id="h4knl"><ol id="h4knl"><em id="h4knl"></em></ol></sub><s id="h4knl"></s>
      1. <strong id="h4knl"></strong>

      2. 如何解決Java Socket通信技術收發線程互斥

        時間:2024-08-28 18:14:33 SUN認證 我要投稿
        • 相關推薦

        如何解決Java Socket通信技術收發線程互斥

          Java Socket通信技術在很長的時間里都在使用,在不少的程序員眼中都有很多高的評價。那么下面我們就看看如何才能掌握這門復雜的編程語言,希望大家在今后的Java Socket通信技術使用中有所收獲。

          下面就是Java Socket通信技術在解決收發線程互斥的代碼介紹。

          1.package com.bill99.svr;

          2.import java.io.IOException;

          3.import java.io.InputStream;

          4.import java.io.OutputStream;

          5.import java.net.InetSocketAddress;

          6.import java.net.Socket;

          7.import java.net.SocketException;

          8.import java.net.SocketTimeoutException;

          9.import java.text.SimpleDateFormat;

          10.import java.util.Date;

          11.import java.util.Properties;

          12.import java.util.Timer;

          13.import java.util.TimerTask;

          14.import java.util.concurrent.ConcurrentHashMap;

          15.import java.util.concurrent.TimeUnit;

          16.import java.util.concurrent.locks.Condition;

          17.import java.util.concurrent.locks.ReentrantLock;

          18.import org.apache.log4j.Logger;

          19./**

          20.*

        title: socket通信包裝類

         

          21.*

        Description:

         

          22.*

        CopyRight: CopyRight (c) 2009

         

          23.*

        Company: 99bill.com

         

          24.*

        Create date: 2009-10-14

         

          25.*author sunnylocus

          26. * v0.10 2009-10-14 初類

          27.* v0.11 2009-11-12 對命令收發邏輯及收發線程互斥機制進行了優化,

          處理命令速度由原來8~16個/秒提高到25~32個/秒

          28.*/ public class SocketConnection {

          29.private volatile Socket socket;

          30.private int timeout = 1000*10; //超時時間,初始值10秒

          31.private boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測

          32.private boolean isNetworkConnect = false; //網絡是否已連接

          33.private static String host = "";

          34.private static int port;

          35.static InputStream inStream = null;

          36.static OutputStream outStream = null;

          37.private static Logger log =Logger.getLogger

          (SocketConnection.class);

          38.private static SocketConnection socketConnection = null;

          39.private static java.util.Timer heartTimer=null;

          40.//private final Map recMsgMap= Collections.

          synchronizedMap(new HashMap());

          41.private final ConcurrentHashMap recMsgMap

          = new ConcurrentHashMap();

          42.private static Thread receiveThread = null;

          43.private final ReentrantLock lock = new ReentrantLock();

          44.private SocketConnection(){

          45.Properties conf = new Properties();

          46.try {

          47.conf.load(SocketConnection.class.getResourceAsStream

          ("test.conf"));

          48.this.timeout = Integer.valueOf(conf.getProperty("timeout"));

          49.init(conf.getProperty("ip"),Integer.valueOf

          (conf.getProperty("port")));

          50.} catch(IOException e) {

          51.log.fatal("socket初始化異常!",e);

          52.throw new RuntimeException("socket初始化異常,請檢查配置參數");

          53.}

          54.}

          55./**

          56.* 單態模式

          57.*/

          58.public static SocketConnection getInstance() {

          59.if(socketConnection==null) {

          60.synchronized(SocketConnection.class) {

          61.if(socketConnection==null) {

          62.socketConnection = new SocketConnection();

          63.return socketConnection;

          64.}

          65.}

          66.}

          67.return socketConnection;

          68.}

          69.private void init(String host,int port) throws IOException {

          70.InetSocketAddress addr = new InetSocketAddress(host,port);

          71.socket = new Socket();

          72.synchronized (this) {

          73.log.info("【準備與"+addr+"建立連接】");

          74.socket.connect(addr, timeout);

          75.log.info("【與"+addr+"連接已建立】");

          76.inStream = socket.getInputStream();

          77.outStream = socket.getOutputStream();

          78.socket.setTcpNoDelay(true);//數據不作緩沖,立即發送

          79.socket.setSoLinger(true, 0);//socket關閉時,立即釋放資源

          80.socket.setKeepAlive(true);

          81.socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸

          82.isNetworkConnect=true;

          83.receiveThread = new Thread(new ReceiveWorker());

          84.receiveThread.start();

          85.SocketConnection.host=host;

          86.SocketConnection.port=port;

          87.if(!isLaunchHeartcheck)

          88.launchHeartcheck();

          89.}

          90.}

          91./**

          92.* 心跳包檢測

          93.*/

          94.private void launchHeartcheck() {

          95.if(socket == null)

          96.throw new IllegalStateException("socket is not

          established!");

          97.heartTimer = new Timer();

          98.isLaunchHeartcheck = true;

          99.heartTimer.schedule(new TimerTask() {

          100.public void run() {

          101.String msgStreamNo = StreamNoGenerator.getStreamNo("kq");

          102.int mstType =9999;//999-心跳包請求

          103.SimpleDateFormat dateformate = new SimpleDateFormat

          ("yyyyMMddHHmmss");

          104.String msgDateTime = dateformate.format(new Date());

          105.int msgLength =38;//消息頭長度

          106.String commandstr = "00" +msgLength + mstType + msgStreamNo;

          107.log.info("心跳檢測包 -> IVR "+commandstr);

          108.int reconnCounter = 1;

          109.while(true) {

          110.String responseMsg =null;

          111.try {

          112.responseMsg = readReqMsg(commandstr);

          113.} catch (IOException e) {

          114.log.error("IO流異常",e);

          115.reconnCounter ++;

          116.}

          117.if(responseMsg!=null) {

          118.log.info("心跳響應包 <- IVR "+responseMsg);

          119.reconnCounter = 1;

          120.break;

          121.} else {

          122.reconnCounter ++;

          123.}

          124.if(reconnCounter >3) {//重連次數已達三次,判定網絡連接中斷,

          重新建立連接。連接未被建立時不釋放鎖

          125.reConnectToCTCC(); break;

          126.}

          127.}

          128.}

          129.},1000 * 60*1,1000*60*2);

          130.}

          131./**

          132.* 重連與目標IP建立重連

          133.*/

          134.private void reConnectToCTCC() {

          135.new Thread(new Runnable(){

          136.public void run(){

          137.log.info("重新建立與"+host+":"+port+"的連接");

          138.//清理工作,中斷計時器,中斷接收線程,恢復初始變量

          139.heartTimer.cancel();

          140.isLaunchHeartcheck=false;

          141.isNetworkConnect = false;

          142.receiveThread.interrupt();

          143.try {

          144.socket.close();

          145.} catch (IOException e1) {log.error("重連時,關閉socket連

          接發生IO流異常",e1);}

          146.//----------------

          147.synchronized(this){

          148.for(; ;){

          149.try {

          150.Thread.currentThread();

          151.Thread.sleep(1000 * 1);

          152.init(host,port);

          153.this.notifyAll();

          154.break ;

          155.} catch (IOException e) {

          156.log.error("重新建立連接未成功",e);

          157.} catch (InterruptedException e){

          158.log.error("重連線程中斷",e);

          159.}

          160.}

          161.}

          162.}

          163.}).start();

          164.}

          165./**

          166.* 發送命令并接受響應

          167.* @param requestMsg

          168.* @return

          169.* @throws SocketTimeoutException

          170.* @throws IOException

          171.*/

          172.public String readReqMsg(String requestMsg) throws IOException {

          173.if(requestMsg ==null) {

          174.return null;

          175.}

          176.if(!isNetworkConnect) {

          177.synchronized(this){

          178.try {

          179.this.wait(1000*5); //等待5秒,如果網絡還沒有恢復,拋出IO流異常

          180.if(!isNetworkConnect) {

          181.throw new IOException("網絡連接中斷!");

          182.}

          183.} catch (InterruptedException e) {

          184.log.error("發送線程中斷",e);

          185.}

          186.}

          187.}

          188.String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號

          189.outStream = socket.getOutputStream();

          190.outStream.write(requestMsg.getBytes());

          191.outStream.flush();

          192.Condition msglock = lock.newCondition(); //消息鎖

          193.//注冊等待接收消息

          194.recMsgMap.put(msgNo, msglock);

          195.try {

          196.lock.lock();

          197.msglock.await(timeout,TimeUnit.MILLISECONDS);

          198.} catch (InterruptedException e) {

          199.log.error("發送線程中斷",e);

          200.} finally {

          201.lock.unlock();

          202.}

          203.Object respMsg = recMsgMap.remove(msgNo); //響應信息

          204.if(respMsg!=null &&(respMsg != msglock)) {

          205.//已經接收到消息,注銷等待,成功返回消息

          206.return (String) respMsg;

          207.} else {

          208.log.error(msgNo+" 超時,未收到響應消息");

          209.throw new SocketTimeoutException(msgNo+" 超時,未收到響應消息");

          210.}

          211.}

          212.public void finalize() {

          213.if (socket != null) {

          214.try {

          215.socket.close();

          216.} catch (IOException e) {

          217.e.printStackTrace();

          218.}

          219.}

          220.}

          221.//消息接收線程

          222.private class ReceiveWorker implements Runnable {

          223.String intStr= null;

          224.public void run() {

          225.while(!Thread.interrupted()){

          226.try {

          227.byte[] headBytes = new byte[4];

          228.if(inStream.read(headBytes)==-1){

          229.log.warn("讀到流未尾,對方已關閉流!");

          230.reConnectToCTCC();//讀到流未尾,對方已關閉流

          231.return;

          232.}

          233.byte[] tmp =new byte[4];

          234.tmp = headBytes;

          235.String tempStr = new String(tmp).trim();

          236.if(tempStr==null || tempStr.equals("")) {

          237.log.error("received message is null");

          238.continue;

          239.}

          240.intStr = new String(tmp);

          241.int totalLength =Integer.parseInt(intStr);

          242.//----------------

          243.byte[] msgBytes = new byte[totalLength-4];

          244.inStream.read(msgBytes);

          245.String resultMsg = new String(headBytes)+ new

          String(msgBytes);

          246.//抽出消息ID

          247.String msgNo = resultMsg.substring(8, 8 + 24);

          248.Condition msglock =(Condition) recMsgMap.get(msgNo);

          249.if(msglock ==null) {

          250.log.warn(msgNo+"序號可能已被注銷!響應消息丟棄");

          251.recMsgMap.remove(msgNo);

          252.continue;

          253.}

          254.recMsgMap.put(msgNo, resultMsg);

          255.try{

          256.lock.lock();

          257.msglock.signalAll();

          258.}finally {

          259.lock.unlock();

          260.}

          261.}catch(SocketException e){

          262.log.error("服務端關閉socket",e);

          263.reConnectToCTCC();

          264.} catch(IOException e) {

          265.log.error("接收線程讀取響應數據時發生IO流異常",e);

          266.} catch(NumberFormatException e){

          267.log.error("收到沒良心包,String轉int異常,異常字符:"+intStr);

          268.}

          269.}

          270.}

          271.}

          272.}

        《&.doc》
        将本文的Word文档下载到电脑,方便收藏和打印
        推荐度:
        点击下载文档

        【如何解決Java Socket通信技術收發線程互斥】相關文章:

        PHP中如何使用socket進行通信08-21

        Java線程同步的方法10-25

        Java多線程的實現方式07-08

        java多線程面試題201710-03

        2016年java多線程面試題及答案07-02

        sun認證考試輔導:java關于多線程的部分操作07-27

        PHP socket的配置08-04

        超線程技術是什么意思09-09

        如何編譯java程序09-28

        如何讓JAVA代碼更高效07-18

        国产高潮无套免费视频_久久九九兔免费精品6_99精品热6080YY久久_国产91久久久久久无码
      3. <sub id="h4knl"><ol id="h4knl"></ol></sub>
        <sup id="h4knl"></sup>
          <sub id="h4knl"></sub>

          <sub id="h4knl"><ol id="h4knl"><em id="h4knl"></em></ol></sub><s id="h4knl"></s>
          1. <strong id="h4knl"></strong>

          2. 思思久久er99精品 | 日本免费在线观看A∨ | 伊人狼人久久青青草原 | 中文字幕久久天堂一区二区 | 日韩精品久久久免费观看 | 亚洲福利高清视频 |

            如何解決Java Socket通信技術收發線程互斥

              Java Socket通信技術在很長的時間里都在使用,在不少的程序員眼中都有很多高的評價。那么下面我們就看看如何才能掌握這門復雜的編程語言,希望大家在今后的Java Socket通信技術使用中有所收獲。

              下面就是Java Socket通信技術在解決收發線程互斥的代碼介紹。

              1.package com.bill99.svr;

              2.import java.io.IOException;

              3.import java.io.InputStream;

              4.import java.io.OutputStream;

              5.import java.net.InetSocketAddress;

              6.import java.net.Socket;

              7.import java.net.SocketException;

              8.import java.net.SocketTimeoutException;

              9.import java.text.SimpleDateFormat;

              10.import java.util.Date;

              11.import java.util.Properties;

              12.import java.util.Timer;

              13.import java.util.TimerTask;

              14.import java.util.concurrent.ConcurrentHashMap;

              15.import java.util.concurrent.TimeUnit;

              16.import java.util.concurrent.locks.Condition;

              17.import java.util.concurrent.locks.ReentrantLock;

              18.import org.apache.log4j.Logger;

              19./**

              20.*

            title: socket通信包裝類

             

              21.*

            Description:

             

              22.*

            CopyRight: CopyRight (c) 2009

             

              23.*

            Company: 99bill.com

             

              24.*

            Create date: 2009-10-14

             

              25.*author sunnylocus

              26. * v0.10 2009-10-14 初類

              27.* v0.11 2009-11-12 對命令收發邏輯及收發線程互斥機制進行了優化,

              處理命令速度由原來8~16個/秒提高到25~32個/秒

              28.*/ public class SocketConnection {

              29.private volatile Socket socket;

              30.private int timeout = 1000*10; //超時時間,初始值10秒

              31.private boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測

              32.private boolean isNetworkConnect = false; //網絡是否已連接

              33.private static String host = "";

              34.private static int port;

              35.static InputStream inStream = null;

              36.static OutputStream outStream = null;

              37.private static Logger log =Logger.getLogger

              (SocketConnection.class);

              38.private static SocketConnection socketConnection = null;

              39.private static java.util.Timer heartTimer=null;

              40.//private final Map recMsgMap= Collections.

              synchronizedMap(new HashMap());

              41.private final ConcurrentHashMap recMsgMap

              = new ConcurrentHashMap();

              42.private static Thread receiveThread = null;

              43.private final ReentrantLock lock = new ReentrantLock();

              44.private SocketConnection(){

              45.Properties conf = new Properties();

              46.try {

              47.conf.load(SocketConnection.class.getResourceAsStream

              ("test.conf"));

              48.this.timeout = Integer.valueOf(conf.getProperty("timeout"));

              49.init(conf.getProperty("ip"),Integer.valueOf

              (conf.getProperty("port")));

              50.} catch(IOException e) {

              51.log.fatal("socket初始化異常!",e);

              52.throw new RuntimeException("socket初始化異常,請檢查配置參數");

              53.}

              54.}

              55./**

              56.* 單態模式

              57.*/

              58.public static SocketConnection getInstance() {

              59.if(socketConnection==null) {

              60.synchronized(SocketConnection.class) {

              61.if(socketConnection==null) {

              62.socketConnection = new SocketConnection();

              63.return socketConnection;

              64.}

              65.}

              66.}

              67.return socketConnection;

              68.}

              69.private void init(String host,int port) throws IOException {

              70.InetSocketAddress addr = new InetSocketAddress(host,port);

              71.socket = new Socket();

              72.synchronized (this) {

              73.log.info("【準備與"+addr+"建立連接】");

              74.socket.connect(addr, timeout);

              75.log.info("【與"+addr+"連接已建立】");

              76.inStream = socket.getInputStream();

              77.outStream = socket.getOutputStream();

              78.socket.setTcpNoDelay(true);//數據不作緩沖,立即發送

              79.socket.setSoLinger(true, 0);//socket關閉時,立即釋放資源

              80.socket.setKeepAlive(true);

              81.socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸

              82.isNetworkConnect=true;

              83.receiveThread = new Thread(new ReceiveWorker());

              84.receiveThread.start();

              85.SocketConnection.host=host;

              86.SocketConnection.port=port;

              87.if(!isLaunchHeartcheck)

              88.launchHeartcheck();

              89.}

              90.}

              91./**

              92.* 心跳包檢測

              93.*/

              94.private void launchHeartcheck() {

              95.if(socket == null)

              96.throw new IllegalStateException("socket is not

              established!");

              97.heartTimer = new Timer();

              98.isLaunchHeartcheck = true;

              99.heartTimer.schedule(new TimerTask() {

              100.public void run() {

              101.String msgStreamNo = StreamNoGenerator.getStreamNo("kq");

              102.int mstType =9999;//999-心跳包請求

              103.SimpleDateFormat dateformate = new SimpleDateFormat

              ("yyyyMMddHHmmss");

              104.String msgDateTime = dateformate.format(new Date());

              105.int msgLength =38;//消息頭長度

              106.String commandstr = "00" +msgLength + mstType + msgStreamNo;

              107.log.info("心跳檢測包 -> IVR "+commandstr);

              108.int reconnCounter = 1;

              109.while(true) {

              110.String responseMsg =null;

              111.try {

              112.responseMsg = readReqMsg(commandstr);

              113.} catch (IOException e) {

              114.log.error("IO流異常",e);

              115.reconnCounter ++;

              116.}

              117.if(responseMsg!=null) {

              118.log.info("心跳響應包 <- IVR "+responseMsg);

              119.reconnCounter = 1;

              120.break;

              121.} else {

              122.reconnCounter ++;

              123.}

              124.if(reconnCounter >3) {//重連次數已達三次,判定網絡連接中斷,

              重新建立連接。連接未被建立時不釋放鎖

              125.reConnectToCTCC(); break;

              126.}

              127.}

              128.}

              129.},1000 * 60*1,1000*60*2);

              130.}

              131./**

              132.* 重連與目標IP建立重連

              133.*/

              134.private void reConnectToCTCC() {

              135.new Thread(new Runnable(){

              136.public void run(){

              137.log.info("重新建立與"+host+":"+port+"的連接");

              138.//清理工作,中斷計時器,中斷接收線程,恢復初始變量

              139.heartTimer.cancel();

              140.isLaunchHeartcheck=false;

              141.isNetworkConnect = false;

              142.receiveThread.interrupt();

              143.try {

              144.socket.close();

              145.} catch (IOException e1) {log.error("重連時,關閉socket連

              接發生IO流異常",e1);}

              146.//----------------

              147.synchronized(this){

              148.for(; ;){

              149.try {

              150.Thread.currentThread();

              151.Thread.sleep(1000 * 1);

              152.init(host,port);

              153.this.notifyAll();

              154.break ;

              155.} catch (IOException e) {

              156.log.error("重新建立連接未成功",e);

              157.} catch (InterruptedException e){

              158.log.error("重連線程中斷",e);

              159.}

              160.}

              161.}

              162.}

              163.}).start();

              164.}

              165./**

              166.* 發送命令并接受響應

              167.* @param requestMsg

              168.* @return

              169.* @throws SocketTimeoutException

              170.* @throws IOException

              171.*/

              172.public String readReqMsg(String requestMsg) throws IOException {

              173.if(requestMsg ==null) {

              174.return null;

              175.}

              176.if(!isNetworkConnect) {

              177.synchronized(this){

              178.try {

              179.this.wait(1000*5); //等待5秒,如果網絡還沒有恢復,拋出IO流異常

              180.if(!isNetworkConnect) {

              181.throw new IOException("網絡連接中斷!");

              182.}

              183.} catch (InterruptedException e) {

              184.log.error("發送線程中斷",e);

              185.}

              186.}

              187.}

              188.String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號

              189.outStream = socket.getOutputStream();

              190.outStream.write(requestMsg.getBytes());

              191.outStream.flush();

              192.Condition msglock = lock.newCondition(); //消息鎖

              193.//注冊等待接收消息

              194.recMsgMap.put(msgNo, msglock);

              195.try {

              196.lock.lock();

              197.msglock.await(timeout,TimeUnit.MILLISECONDS);

              198.} catch (InterruptedException e) {

              199.log.error("發送線程中斷",e);

              200.} finally {

              201.lock.unlock();

              202.}

              203.Object respMsg = recMsgMap.remove(msgNo); //響應信息

              204.if(respMsg!=null &&(respMsg != msglock)) {

              205.//已經接收到消息,注銷等待,成功返回消息

              206.return (String) respMsg;

              207.} else {

              208.log.error(msgNo+" 超時,未收到響應消息");

              209.throw new SocketTimeoutException(msgNo+" 超時,未收到響應消息");

              210.}

              211.}

              212.public void finalize() {

              213.if (socket != null) {

              214.try {

              215.socket.close();

              216.} catch (IOException e) {

              217.e.printStackTrace();

              218.}

              219.}

              220.}

              221.//消息接收線程

              222.private class ReceiveWorker implements Runnable {

              223.String intStr= null;

              224.public void run() {

              225.while(!Thread.interrupted()){

              226.try {

              227.byte[] headBytes = new byte[4];

              228.if(inStream.read(headBytes)==-1){

              229.log.warn("讀到流未尾,對方已關閉流!");

              230.reConnectToCTCC();//讀到流未尾,對方已關閉流

              231.return;

              232.}

              233.byte[] tmp =new byte[4];

              234.tmp = headBytes;

              235.String tempStr = new String(tmp).trim();

              236.if(tempStr==null || tempStr.equals("")) {

              237.log.error("received message is null");

              238.continue;

              239.}

              240.intStr = new String(tmp);

              241.int totalLength =Integer.parseInt(intStr);

              242.//----------------

              243.byte[] msgBytes = new byte[totalLength-4];

              244.inStream.read(msgBytes);

              245.String resultMsg = new String(headBytes)+ new

              String(msgBytes);

              246.//抽出消息ID

              247.String msgNo = resultMsg.substring(8, 8 + 24);

              248.Condition msglock =(Condition) recMsgMap.get(msgNo);

              249.if(msglock ==null) {

              250.log.warn(msgNo+"序號可能已被注銷!響應消息丟棄");

              251.recMsgMap.remove(msgNo);

              252.continue;

              253.}

              254.recMsgMap.put(msgNo, resultMsg);

              255.try{

              256.lock.lock();

              257.msglock.signalAll();

              258.}finally {

              259.lock.unlock();

              260.}

              261.}catch(SocketException e){

              262.log.error("服務端關閉socket",e);

              263.reConnectToCTCC();

              264.} catch(IOException e) {

              265.log.error("接收線程讀取響應數據時發生IO流異常",e);

              266.} catch(NumberFormatException e){

              267.log.error("收到沒良心包,String轉int異常,異常字符:"+intStr);

              268.}

              269.}

              270.}

              271.}

              272.}